#!/usr/bin/python
# -*- coding: utf-8 -*-

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import parent
from testlib import *

import os
import re

RESOLV_SCRIPT = """
set -e
# HACK: Racing with operating systems reading/updating resolv.conf and
# the fact that resolv.conf can be a symbolic link. Avoid failures like:
# chattr: Operation not supported while reading flags on /etc/resolv.conf
for x in 1 2 3 4 5; do
    hostnamectl set-hostname x0.cockpit.lan
    printf 'domain cockpit.lan\nsearch cockpit.lan\nnameserver {address}\n' >/etc/resolv2.conf
    chcon -v unconfined_u:object_r:net_conf_t:s0 /etc/resolv2.conf || true
    mv /etc/resolv2.conf /etc/resolv.conf
    if chattr +i /etc/resolv.conf; then
        break
    else
        sleep $x
    fi
done
"""

def prepare_resolv(machine, address):
    machine.execute(script=RESOLV_SCRIPT.format(address=address))

@skipImage("No realmd available", "continuous-atomic", "fedora-atomic", "rhel-atomic")
@skipImage("No freeipa available", "debian-8", "debian-testing")
class TestRealms(MachineCase):
    additional_machines = {
        'ipa': { 'machine': { 'image': 'ipa' }, 'start': { 'memory_mb': 2048 } }
    }

    def testIpa(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/system")

        # Use the FreeIPA instance as the DNS server
        prepare_resolv(m, self.machines['ipa'].address)

        # Wait for DNS to work as expected.
        # https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11
        #
        wait(lambda: m.execute("nslookup -type=SRV _ldap._tcp.cockpit.lan"))

        def wait_number_domains(n):
            if n == 0:
                b.wait_text("#system-info-domain a", "Join Domain")
            else:
                b.wait_text_not("#system-info-domain a", "Join Domain")
            b.wait_not_attr("#system-info-domain a", "disabled", "disabled")

        wait_number_domains(0)

        # Join cockpit.lan
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        with b.wait_timeout(180):
            b.set_val(".realms-op-address", "cockpit.lan")
            b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"')
            b.set_val(".realms-op-admin", "admin")
            b.set_val(".realms-op-admin-password", "foobarfoo")
            b.click(".realms-op-apply")
            b.wait_popdown("realms-op")

            # Check that this has worked
            wait_number_domains(1)

        # Leave the domain
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        b.click(".realms-op-apply")
        b.wait_popdown("realms-op")
        wait_number_domains(0)

        # Send a wrong password
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        b.set_val(".realms-op-address", "cockpit.lan")
        b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"')
        b.set_val(".realms-op-admin", "admin")
        b.set_val(".realms-op-admin-password", "foo")
        b.click(".realms-op-apply")
        b.wait_text_not(".realms-op-error", "")
        b.click(".realms-op-cancel")
        b.wait_popdown("realms-op")

        # Try to join a non-existing domain
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        b.set_val(".realms-op-address", "NOPE")
        b.wait_js_cond("$('.realms-op-address-error').attr('title') != ''")
        b.click(".realms-op-cancel")
        b.wait_popdown("realms-op")

        # Cancel a join
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        b.set_val(".realms-op-address", "cockpit.lan")
        b.wait_attr(".realms-op-admin", "placeholder", 'e.g. "admin"')
        b.set_val(".realms-op-admin", "admin")
        b.set_val(".realms-op-admin-password", "foobarfoo")
        b.click(".realms-op-apply")
        b.wait_visible(".realms-op-spinner")
        b.click(".realms-op-cancel")
        b.wait_popdown("realms-op")

        # HACK sssd regression
        # https://github.com/cockpit-project/cockpit/issues/5142
        # https://bugzilla.redhat.com/show_bug.cgi?id=1380953
        self.allow_journal_messages('.*denied.*{ getattr } for.*comm="sssd" name="/".*')

    def testNotSupported(self):
        m = self.machine
        b = self.browser

        # Disable sssd support in realmd

        realmd_distro_conf = "/usr/lib/realmd/realmd-distro.conf"
        if "rhel" in m.image or "centos" in m.image:
            realmd_distro_conf = "/usr/lib64/realmd/realmd-distro.conf"

        m.execute("echo -e '[providers]\nsssd = no\n' >>%s" % realmd_distro_conf)

        self.login_and_go("/system")

        # Use the FreeIPA instance as the DNS server
        prepare_resolv(m, self.machines['ipa'].address)

        # Wait for DNS to work as expected.
        # https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11
        #
        wait(lambda: m.execute("nslookup -type=SRV _ldap._tcp.cockpit.lan"))

        # Join cockpit.lan
        b.click("#system-info-domain a")
        b.wait_popup("realms-op")
        b.set_val(".realms-op-address", "cockpit.lan")
        b.wait_js_cond("$('.realms-op-address-error').attr('title') != ''")
        b.set_val(".realms-op-admin", "admin")
        b.set_val(".realms-op-admin-password", "foobarfoo")
        b.click(".realms-op-apply")
        b.wait_text(".realms-op-error", "Joining this domain is not supported")

JOIN_SCRIPT = """
set -ex
# HACK: https://bugzilla.redhat.com/show_bug.cgi?id=1071356#c11
for x in $(seq 1 20); do
    if nslookup -type=SRV _ldap._tcp.cockpit.lan; then
        break
    else
        sleep $x
    fi
done

if ! echo '%(password)s' | realm join -U admin cockpit.lan; then
    journalctl -u realmd.service
fi

echo '%(password)s' | kinit -f admin@COCKPIT.LAN

# HACK: https://bugzilla.redhat.com/show_bug.cgi?id=1144292
curl --insecure -s --negotiate -u : https://f0.cockpit.lan/ipa/json --header 'Referer: https://f0.cockpit.lan/ipa' --header "Content-Type: application/json" --header "Accept: application/json" --data '{"params": [["HTTP/x0.cockpit.lan@COCKPIT.LAN"], {"raw": false, "all": false, "version": "2.101", "force": true, "no_members": false, "ipakrbokasdelegate": true}], "method": "service_add", "id": 0}'
ipa-getkeytab -q -s f0.cockpit.lan -p HTTP/x0.cockpit.lan -k /etc/krb5.keytab

# HACK: https://bugs.freedesktop.org/show_bug.cgi?id=98479
if ! grep -q 'services.*nss' /etc/sssd/sssd.conf; then
    sed -i 's/^services = sudo, ssh$/services = sudo, ssh, nss, pam/' /etc/sssd/sssd.conf
    systemctl restart sssd
fi

# This needs to work
getent passwd admin@cockpit.lan

# This directory should be owned by the domain user
chown -R admin@cockpit.lan /home/admin
"""

# This is here because our test framework can't run ipa VM's twice
@skipImage("No realmd available", "continuous-atomic", "fedora-atomic", "rhel-atomic")
@skipImage("No freeipa available", "debian-8", "debian-testing")
class TestKerberos(MachineCase):
    additional_machines = {
        'ipa': { 'machine': { 'image': 'ipa' }, 'start': { 'memory_mb': 2048 } }
    }

    def configure_kerberos(self):
        # Setup a place for kerberos caches
        args = { "addr": self.machines['ipa'].address, "password": "foobarfoo" }
        prepare_resolv(self.machine, self.machines['ipa'].address)
        self.machine.execute(script=JOIN_SCRIPT % args)

    def tearDown(self):
        if 'KRB5CCNAME' in os.environ:
            del os.environ['KRB5CCNAME']
        if 'KRB_CONFIG' in os.environ:
            del os.environ['KRB5_CONFIG']

        MachineCase.tearDown(self)

    def testNegotiate(self):
        self.allow_authorize_journal_messages()
        self.allow_hostkey_messages()
        b = self.browser

        # HACK: There is no operating system where the domain admins are admins by default
        # This is something that needs to be worked on at an OS level. We use admin level
        # features below, such as adding a machine to the dashboard
        self.machine.execute("echo 'admin@cockpit.lan        ALL=(ALL)       NOPASSWD: ALL' >> /etc/sudoers")

        # Make sure negotiate auth is not offered first
        self.machine.start_cockpit()

        output = self.machine.execute(['/usr/bin/curl -v -s',
                                       '--resolve', 'x0.cockpit.lan:9090:%s' % self.machine.address,
                                       'http://x0.cockpit.lan:9090/cockpit/login', '2>&1'])
        self.assertIn("HTTP/1.1 401", output)
        self.assertNotIn("WWW-Authenticate: Negotiate", output)

        self.configure_kerberos()
        self.machine.restart_cockpit()

        output = self.machine.execute(['/usr/bin/curl', '-s', '--negotiate', '--delegation', 'always', '-u', ':', "-D", "-",
                                       '--resolve', 'x0.cockpit.lan:9090:%s' % self.machine.address,
                                       'http://x0.cockpit.lan:9090/cockpit/login'])
        self.assertIn("HTTP/1.1 200 OK", output)
        self.assertIn('"admin@cockpit.lan"', output)

        cookie = re.search("Set-Cookie: cockpit=([^ ;]+)", output).group(1)
        b.open("/system/terminal", cookie={ "name": "cockpit", "value": cookie, "domain": self.machine.address, "path": "/" })
        b.wait_present('#content')
        b.wait_visible('#content')
        b.enter_page("/system/terminal")
        b.wait_present(".terminal")
        b.focus(".terminal")

        def line_sel(i):
            return '.terminal > div:nth-child(%d)' % i

        def line_text(t):
            return t + u'\xa0'*(80-len(t))

        # wait for prompt in first line
        b.wait_text_not(line_sel(1), line_text(""))

        # run kinit and see if got forwarded the kerberos ticket into the session
        b.key_press(list("klist") + [ "Return" ])
        b.wait_text_not(line_sel(2), line_text(""))
        text = b.text(line_sel(2)).replace(u"\xa0", " ").strip()
        self.assertIn("Ticket cache", text)

        # Now connect to another machine
        self.assertNotIn("admin@cockpit.lan", self.machine.execute("ps -xa | grep sshd"))
        b.switch_to_top()
        b.go("/@x0.cockpit.lan/system/terminal")
        b.wait_visible("#machine-troubleshoot")
        b.click("#machine-troubleshoot")
        b.wait_popup('troubleshoot-dialog')
        b.wait_present("#troubleshoot-dialog .btn-primary:not([disabled])")
        b.wait_text('#troubleshoot-dialog .btn-primary', "Add")
        b.click('#troubleshoot-dialog .btn-primary')
        b.wait_in_text('#troubleshoot-dialog', "Fingerprint")
        b.click('#troubleshoot-dialog .btn-primary')
        b.wait_popdown('troubleshoot-dialog')
        b.enter_page("/system/terminal", host="x0.cockpit.lan")
        b.wait_present(".terminal")
        b.wait_visible(".terminal")

        # Make sure we connected via SSH
        self.assertIn("admin@cockpit.lan", self.machine.execute("ps -xa | grep sshd"))

if __name__ == '__main__':
    test_main()
